package com.atguigu.gmall.realtime.app.dwd;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.atguigu.gmall.realtime.app.func.DimSink;
import com.atguigu.gmall.realtime.app.func.MyStringDebeziumDeserializationSchema;
import com.atguigu.gmall.realtime.app.func.TableProcessFunction;
import com.atguigu.gmall.realtime.bean.TableProcess;
import com.atguigu.gmall.realtime.utils.MyKafka;
import com.atguigu.gmall.realtime.utils.MyKafkaPro;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/*将业务数据通过Maxwell监控，写入到kafka的数据读入,
  进行分流，若表是业务表写入（kafka）,维度表写入（hbase）k.v
  通过flinkcdc动态监控配置表.才能具体分析那个表是维度，事实.
  */
public class BaseDBApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 读取ods_base_db_m 中的数据作为消费者
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        String topic ="ods_base_db_m";
        String groupID="BaseDBApp";

        FlinkKafkaConsumer<String> sourceFunction = MyKafka.getFlinkKafkaConsumer(topic, groupID);

        DataStreamSource<String> kafkaDSteam = env.addSource(sourceFunction);
        //对流中的数据进行转换，转换为json格式
        SingleOutputStreamOperator jsonObjectDSteam = kafkaDSteam.map(r -> JSONObject.parseObject(r));

        //测试是否可以被消费.
        //jsonObjectDSteam.print();
        //对转换后的数据进行清洗，
        SingleOutputStreamOperator filterDSteam = jsonObjectDSteam.filter(
                new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObject) throws Exception {
                        Boolean flag = jsonObject.getString("table") != null
                                && jsonObject.getString("table").length() > 0
                                && jsonObject.getJSONObject("data") != null
                                && jsonObject.getString("data").length() > 4;
                        return flag;
                    }
                }
        );

       /* {
            "database": "gmall1021",
                "xid": 713,
                "data": {
            "area_code": "115454",
                    "region_id": "2",
                    "iso_3166_2": "ad",
                    "id": 35,
                    "iso_code": "cn-24"
        },
            "commit": true,
                "type": "insert",
                "table": "base_province",
                "ts": 1618923929
        }*/

        //filterDSteam.print("过滤后的数据.............................");


        //TODO 2 通过flink_cdc动态读取配置表中的数据


        //读取输入环境
        SourceFunction<String> sourceFunctionsql = MySQLSource.<String>builder()
                .hostname("hadoop104")
                .port(3306)
                //可以监控多个数据库.
                .databaseList("gmall1021_realtime") // monitor all tables under inventory database
                //因为可以输入的多个数据库中可能有表明重复，所以表前需要+数据库名.
                .tableList("gmall1021_realtime.table_process")
                .username("root")
                .password("000000")
                //initial 当启动时都数据库，可以读历史数据
                //earliest 从binlog开始读
                //latest //从binlog末尾读.
                .startupOptions(StartupOptions.initial())
                .deserializer(new MyStringDebeziumDeserializationSchema()) // converts SourceRecord to String
                .build();

        DataStreamSource<String> mySqlDStram = env.addSource(sourceFunctionsql);
        mySqlDStram.print();
        //TODO 3  将动态读入的配置表信息向下游广播，存入状态中，使得主流可以获取表的信息进行判断.

        //创建一个状态描述器,输入类型k.v，value应该等于表的实体类也就是tableProcess类
        MapStateDescriptor<String, TableProcess> mapStatusDescriptor = new MapStateDescriptor<String, TableProcess>("mapStatusDescriptor", String.class, TableProcess.class);

        BroadcastStream<String> broadcastDStream = mySqlDStram.broadcast(mapStatusDescriptor);//传入一个状态描述器。可以用来储存表的状态.

        //将主流和广播流连接在一起
        BroadcastConnectedStream connectDStream = filterDSteam.connect(broadcastDStream);

        //对流的数据进行分流处理， 维度数据-侧输出流，事实数据-主流
        //定义一个侧输出流
        OutputTag<JSONObject> dimOutputTag=new OutputTag<JSONObject>("dimTag") {};
        //在TableProcessFunction中对两个流的数据进行处理.
        SingleOutputStreamOperator splitDStream = connectDStream.process(new TableProcessFunction(dimOutputTag,mapStatusDescriptor));

//6.5 获取侧输出流
        DataStream<JSONObject> dimDS = splitDStream.getSideOutput(dimOutputTag);

        splitDStream.print(">>>>");
        dimDS.print("#####");

        //TODO 7.将维度侧输出流的数据插入到Phoenix中
        dimDS.addSink(new DimSink());

        //TODO 8 将事实数据写入到kafak
        //自定义序列化输出序列话方式.
        splitDStream.addSink(MyKafkaPro.getFlinkKafkaSink(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
                //获取要输入的topic
                String topic = jsonObj.getString("sink_table");
                //获取数据，也就是表的数据信息。
                JSONObject datajsonObject = jsonObj.getJSONObject("data");
                return new ProducerRecord<byte[], byte[]>(topic,datajsonObject.toString().getBytes());
            }
        }
        ));


        env.execute();


    }
}
